#!/usr/bin/env python

'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import Queue

import logging
import traceback
import threading
import pprint
import os
import json

from AgentException import AgentException
from LiveStatus import LiveStatus
from shell import shellRunner
from ActualConfigHandler import ActualConfigHandler
from CommandStatusDict import CommandStatusDict
from CustomServiceOrchestrator import CustomServiceOrchestrator
from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle


logger = logging.getLogger()
installScriptHash = -1

class ActionQueue(threading.Thread):
  """ Action Queue for the agent. We pick one command at a time from the queue
  and execute it
  Note: Action and command terms in this and related classes are used interchangeably
  """

  # How many actions can be performed in parallel. Feel free to change
  MAX_CONCURRENT_ACTIONS = 5


  #How much time(in seconds) we need wait for new incoming execution command before checking
  #status command queue
  EXECUTION_COMMAND_WAIT_TIME = 2

  STATUS_COMMAND = 'STATUS_COMMAND'
  EXECUTION_COMMAND = 'EXECUTION_COMMAND'
  BACKGROUND_EXECUTION_COMMAND = 'BACKGROUND_EXECUTION_COMMAND'
  ROLE_COMMAND_INSTALL = 'INSTALL'
  ROLE_COMMAND_START = 'START'
  ROLE_COMMAND_STOP = 'STOP'
  ROLE_COMMAND_CUSTOM_COMMAND = 'CUSTOM_COMMAND'
  CUSTOM_COMMAND_RESTART = 'RESTART'

  IN_PROGRESS_STATUS = 'IN_PROGRESS'
  COMPLETED_STATUS = 'COMPLETED'
  FAILED_STATUS = 'FAILED'

  def __init__(self, config, controller):
    super(ActionQueue, self).__init__()
    self.commandQueue = Queue.Queue()
    self.statusCommandQueue = Queue.Queue()
    self.backgroundCommandQueue = Queue.Queue()
    self.commandStatuses = CommandStatusDict(callback_action =
      self.status_update_callback)
    self.config = config
    self.controller = controller
    self.sh = shellRunner()
    self.configTags = {}
    self._stop = threading.Event()
    self.tmpdir = config.get('agent', 'prefix')
    self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller)


  def stop(self):
    self._stop.set()

  def stopped(self):
    return self._stop.isSet()

  def put_status(self, commands):
    #Was supposed that we got all set of statuses, we don't need to keep old ones
    self.statusCommandQueue.queue.clear()

    for command in commands:
      logger.info("Adding " + command['commandType'] + " for service " + \
                    command['serviceName'] + " of cluster " + \
                    command['clusterName'] + " to the queue.")
      logger.debug(pprint.pformat(command))
      self.statusCommandQueue.put(command)

  def put(self, commands):
    for command in commands:
      if not command.has_key('serviceName'):
        command['serviceName'] = "null"
      if not command.has_key('clusterName'):
        command['clusterName'] = 'null'

      logger.info("Adding " + command['commandType'] + " for service " + \
                  command['serviceName'] + " of cluster " + \
                  command['clusterName'] + " to the queue.")
      logger.debug(pprint.pformat(command))
      if command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND :
        self.backgroundCommandQueue.put(self.createCommandHandle(command))
      else:
        self.commandQueue.put(command)

  def cancel(self, commands):
    for command in commands:

      logger.info("Canceling command {tid}".format(tid = str(command['target_task_id'])))
      logger.debug(pprint.pformat(command))

      task_id = command['target_task_id']
      reason = command['reason']

      # Remove from the command queue by task_id
      queue = self.commandQueue
      self.commandQueue = Queue.Queue()

      while not queue.empty():
        queued_command = queue.get(False)
        if queued_command['task_id'] != task_id:
          self.commandQueue.put(queued_command)
        else:
          logger.info("Canceling " + queued_command['commandType'] + \
                      " for service " + queued_command['serviceName'] + \
                      " of cluster " +  queued_command['clusterName'] + \
                      " to the queue.")

      # Kill if in progress
      self.customServiceOrchestrator.cancel_command(task_id, reason)

  def run(self):
    while not self.stopped():
      self.processBackgroundQueueSafeEmpty();
      self.processStatusCommandQueueSafeEmpty();
      try:
        command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
        self.process_command(command)
      except (Queue.Empty):
        pass
  def processBackgroundQueueSafeEmpty(self):
    while not self.backgroundCommandQueue.empty():
      try:
        command = self.backgroundCommandQueue.get(False)
        if(command.has_key('__handle') and command['__handle'].status == None): 
          self.process_command(command)
      except (Queue.Empty):
        pass
  
  def processStatusCommandQueueSafeEmpty(self):
    while not self.statusCommandQueue.empty():
      try:
        command = self.statusCommandQueue.get(False)
        self.process_command(command)
      except (Queue.Empty):
        pass


  def createCommandHandle(self, command):
    if(command.has_key('__handle')):
      raise AgentException("Command already has __handle")
    command['__handle'] = BackgroundCommandExecutionHandle(command, command['commandId'], None, self.on_background_command_complete_callback)
    return command

  def process_command(self, command):
    logger.debug("Took an element of Queue: " + pprint.pformat(command))
    # make sure we log failures
    commandType = command['commandType']
    try:
      if commandType in [self.EXECUTION_COMMAND, self.BACKGROUND_EXECUTION_COMMAND]:
        self.execute_command(command)
      elif commandType == self.STATUS_COMMAND:
        self.execute_status_command(command)
      else:
        logger.error("Unrecognized command " + pprint.pformat(command))
    except Exception, err:
      # Should not happen
      traceback.print_exc()
      logger.warn(err)

  def execute_command(self, command):
    '''
    Executes commands of type EXECUTION_COMMAND
    '''
    clusterName = command['clusterName']
    commandId = command['commandId']
    isCommandBackground = command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND
    message = "Executing command with id = {commandId} for role = {role} of " \
              "cluster {cluster}.".format(
              commandId = str(commandId), role=command['role'],
              cluster=clusterName)
    logger.info(message)
    logger.debug(pprint.pformat(command))

    taskId = command['taskId']
    # Preparing 'IN_PROGRESS' report
    in_progress_status = self.commandStatuses.generate_report_template(command)
    # The path of the files that contain the output log and error log use a prefix that the agent advertises to the
    # server. The prefix is defined in agent-config.ini
    in_progress_status.update({
      'tmpout': self.tmpdir + os.sep + 'output-' + str(taskId) + '.txt',
      'tmperr': self.tmpdir + os.sep + 'errors-' + str(taskId) + '.txt',
      'structuredOut' : self.tmpdir + os.sep + 'structured-out-' + str(taskId) + '.json',
      'status': self.IN_PROGRESS_STATUS
    })
    self.commandStatuses.put_command_status(command, in_progress_status)
    
    # running command
    commandresult = self.customServiceOrchestrator.runCommand(command,
      in_progress_status['tmpout'], in_progress_status['tmperr'])
   
    
    # dumping results
    if isCommandBackground:
      return
    else:
      status = self.COMPLETED_STATUS if commandresult['exitcode'] == 0 else self.FAILED_STATUS  
    roleResult = self.commandStatuses.generate_report_template(command)
    roleResult.update({
      'stdout': commandresult['stdout'],
      'stderr': commandresult['stderr'],
      'exitCode': commandresult['exitcode'],
      'status': status,
    })
    if roleResult['stdout'] == '':
      roleResult['stdout'] = 'None'
    if roleResult['stderr'] == '':
      roleResult['stderr'] = 'None'

    # let ambari know name of custom command
    if command['hostLevelParams'].has_key('custom_command'):
      roleResult['customCommand'] = command['hostLevelParams']['custom_command']

    if 'structuredOut' in commandresult:
      roleResult['structuredOut'] = str(json.dumps(commandresult['structuredOut']))
    else:
      roleResult['structuredOut'] = ''

    # let ambari know that configuration tags were applied
    if status == self.COMPLETED_STATUS:
      configHandler = ActualConfigHandler(self.config, self.configTags)
      #update 
      if command.has_key('forceRefreshConfigTags') and len(command['forceRefreshConfigTags']) > 0  :
        
        forceRefreshConfigTags = command['forceRefreshConfigTags']
        logger.info("Got refresh additional component tags command")
        
        for configTag in forceRefreshConfigTags :
          configHandler.update_component_tag(command['role'], configTag, command['configurationTags'][configTag])
        
        roleResult['customCommand'] = self.CUSTOM_COMMAND_RESTART # force restart for component to evict stale_config on server side
        command['configurationTags'] = configHandler.read_actual_component(command['role'])
        
      if command.has_key('configurationTags'):
        configHandler.write_actual(command['configurationTags'])
        roleResult['configurationTags'] = command['configurationTags']
      component = {'serviceName':command['serviceName'],'componentName':command['role']}
      if command.has_key('roleCommand') and \
        (command['roleCommand'] == self.ROLE_COMMAND_START or \
        (command['roleCommand'] == self.ROLE_COMMAND_INSTALL \
        and component in LiveStatus.CLIENT_COMPONENTS) or \
        (command['roleCommand'] == self.ROLE_COMMAND_CUSTOM_COMMAND and \
        command['hostLevelParams'].has_key('custom_command') and \
        command['hostLevelParams']['custom_command'] == self.CUSTOM_COMMAND_RESTART)):
        configHandler.write_actual_component(command['role'], command['configurationTags'])
        if command['hostLevelParams'].has_key('clientsToUpdateConfigs') and \
          command['hostLevelParams']['clientsToUpdateConfigs']:
          configHandler.write_client_components(command['serviceName'], command['configurationTags'],
                                                command['hostLevelParams']['clientsToUpdateConfigs'])
        roleResult['configurationTags'] = configHandler.read_actual_component(command['role'])

    self.commandStatuses.put_command_status(command, roleResult)

  def command_was_canceled(self):
    self.customServiceOrchestrator
  def on_background_command_complete_callback(self, process_condenced_result, handle):
    logger.debug('Start callback: %s' % process_condenced_result)
    logger.debug('The handle is: %s' % handle)
    status = self.COMPLETED_STATUS if handle.exitCode == 0 else self.FAILED_STATUS
    
    aborted_postfix = self.customServiceOrchestrator.command_canceled_reason(handle.command['taskId'])
    if aborted_postfix:
      status = self.FAILED_STATUS
      logger.debug('Set status to: %s , reason = %s' % (status, aborted_postfix))
    else:
      aborted_postfix = ''
      
    
    roleResult = self.commandStatuses.generate_report_template(handle.command)
    
    roleResult.update({
      'stdout': process_condenced_result['stdout'] + aborted_postfix,
      'stderr': process_condenced_result['stderr'] + aborted_postfix,
      'exitCode': process_condenced_result['exitcode'],
      'structuredOut': str(json.dumps(process_condenced_result['structuredOut'])) if 'structuredOut' in process_condenced_result else '',
      'status': status,
    })
    
    self.commandStatuses.put_command_status(handle.command, roleResult)

  def execute_status_command(self, command):
    '''
    Executes commands of type STATUS_COMMAND
    '''
    try:
      cluster = command['clusterName']
      service = command['serviceName']
      component = command['componentName']
      configurations = command['configurations']
      if configurations.has_key('global'):
        globalConfig = configurations['global']
      else:
        globalConfig = {}

      livestatus = LiveStatus(cluster, service, component,
                              globalConfig, self.config, self.configTags)

      component_extra = None

      # For custom services, responsibility to determine service status is
      # delegated to python scripts
      component_status_result = self.customServiceOrchestrator.requestComponentStatus(command)

      if component_status_result['exitcode'] == 0:
        component_status = LiveStatus.LIVE_STATUS
      else:
        component_status = LiveStatus.DEAD_STATUS

      if component_status_result.has_key('structuredOut'):
        component_extra = component_status_result['structuredOut']

      result = livestatus.build(forsed_component_status= component_status)

      if component_extra is not None and len(component_extra) != 0:
        if component_extra.has_key('alerts'):
          result['alerts'] = component_extra['alerts']
          del component_extra['alerts']

        result['extra'] = component_extra

      logger.debug("Got live status for component " + component + \
                   " of service " + str(service) + \
                   " of cluster " + str(cluster))

      logger.debug(pprint.pformat(result))
      if result is not None:
        self.commandStatuses.put_command_status(command, result)
    except Exception, err:
      traceback.print_exc()
      logger.warn(err)
    pass


  # Store action result to agent response queue
  def result(self):
    return self.commandStatuses.generate_report()


  def status_update_callback(self):
    """
    Actions that are executed every time when command status changes
    """
    self.controller.heartbeat_wait_event.set()

  # Removes all commands from the queue
  def reset(self):
    queue = self.commandQueue
    with queue.mutex:
      queue.queue.clear()

